Join devRant
Do all the things like
++ or -- rants, post your own rants, comment on others' rants and build your customized dev avatar
Sign Up
Pipeless API

From the creators of devRant, Pipeless lets you power real-time personalized recommendations and activity feeds using a simple API
Learn More
Search - "data streaming"
-
I spent the last three weeks+ (literally THREE full weeks, weekends too) building something I thought was really cool, powerful, and useful. Made a blog post, posted a giant thread on the company Twitter.
Literally one person gave it a like.
I don't know why I give shit anymore, cuz nowadays if it isn't about getting rich quick, cHaTgPt, or some other made up hype, no one cares. Apparently I shouldn't either...
Meanwhile my 16 GIGABYTE RAM MAC, yes 16 GIGABYTE RAM can't even hold power while plugged in, and I'm still clowning around with an ancient iPhone 6 (actually one of my mom's old iphones) that barely stays above 20% battery for more than an hour...
And FINALLY, my FUCKING ISP is for sure screwing me, since I've been doing some hard core data streaming and broadcasting, even though I pay $60+ month for that shit it, keeps dropping out, shit doesn't load.... I mean wtf this isn't 1990 dialup AOL anymore
When I step back I just feel like the worlds biggest loser, maybe the world's biggest 🤡8 -
what are the basics I should know about "data streaming" for working on video streaming companies as a future senior backend Golang developer?4
-
I am trying to extract data from the PubSub subscription and finally, once the data is extracted I want to do some transformation. Currently, it's in bytes format. I have tried multiple ways to extract the data in JSON format using custom schema it fails with an error
TypeError: __main__.MySchema() argument after ** must be a mapping, not str [while running 'Map to MySchema']
**readPubSub.py**
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import typing
class MySchema(typing.NamedTuple):
user_id:str
event_ts:str
create_ts:str
event_id:str
ifa:str
ifv:str
country:str
chip_balance:str
game:str
user_group:str
user_condition:str
device_type:str
device_model:str
user_name:str
fb_connect:bool
is_active_event:bool
event_payload:str
TOPIC_PATH = "projects/nectar-259905/topics/events"
def run(pubsub_topic):
options = PipelineOptions(
streaming=True
)
runner = 'DirectRunner'
print("I reached before pipeline")
with beam.Pipeline(runner, options=options) as pipeline:
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| 'Map to MySchema' >> beam.Map(lambda msg: MySchema(**msg)).with_output_types(MySchema)
| "Writing to console" >> beam.Map(print))
print("I reached after pipeline")
result = message.run()
result.wait_until_finish()
run(TOPIC_PATH)
If I use it directly below
message=(
pipeline
| "Read from Pub/Sub topic" >> beam.io.ReadFromPubSub(subscription='projects/triple-nectar-259905/subscriptions/bq_subscribe')#.with_output_types(bytes)
| 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| "Writing to console" >> beam.Map(print))
I get output as
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:42:52.283 UTC',
'event_id': 'Game_Request_Declined',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9140',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
{
'user_id': '102105290400258488',
'event_ts': '2021-05-29 20:54:38.297 UTC',
'event_id': 'Decline_Game_Request',
'ifa': '6090a6c7-4422-49b5-8757-ccfdbad',
'ifv': '3fc6eb8b4d0cf096c47e2252f41',
'country': 'US',
'chip_balance': '9905',
'game': 'gru',
'user_group': '[1, 36, 529702]',
'user_condition': '[1, 36]',
'device_type': 'phone',
'device_model': 'TCL 5007Z',
'user_name': 'Minnie',
'fb_connect': True,
'event_payload': '{"competition_type":"normal","game_started_from":"result_flow_rematch","variant":"target"}',
'is_active_event': True
}
Please let me know if I m doing something wrong while parsing the data to JSON. Also, I am looking for examples to do data masking and run some SQL within Apache Beam4